Skip to content

[Fix](pyudf) Fix some pyudf issues#62249

Draft
linrrzqqq wants to merge 11 commits intoapache:masterfrom
linrrzqqq:pyudf-test
Draft

[Fix](pyudf) Fix some pyudf issues#62249
linrrzqqq wants to merge 11 commits intoapache:masterfrom
linrrzqqq:pyudf-test

Conversation

@linrrzqqq
Copy link
Copy Markdown
Collaborator

@linrrzqqq linrrzqqq commented Apr 8, 2026

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

  1. After deleting the cache file for the corresponding UDF, using it again will cause an error.
</failure>
  </testcase>
  <testcase classname="stress.python_udf" name="python_udf_module_cache_isolation" time="100.051">
    <failure message="flowId=stress/python_udf/udf_module_cache_isolation.groovy#python_udf_module_cache_isolation" type="SuiteFailure">    logger.info("Scene 9a: function works before cache deletion, result=${r9a[0][0]}")

    // 删除所有 BE 上的 UDF cache 文件(只删文件,保留目录结构)
    aliveBEs.each { be -&gt;
        execSSH(be.Host, "find /mnt/hdd01/PERFORMANCE_ENV/be/lib/udf/ -mindepth 2 -type f -delete 2&gt;/dev/null; true")
    }
    logger.info("Scene 9b: UDF cache files deleted on all BEs")

    // 再次调用,BE 应自动重新下载 zip 并执行
    def r9b = sql "SELECT cache_recover_${runId}(10)"
^^^^^^^^^^^^^^^^^^^^^^^^^^ERROR LINE^^^^^^^^^^^^^^^^^^^^^^^^^^
    assert r9b[0][0] == 787, "cache recover after delete: expected 787, got ${r9b[0][0]}"
    logger.info("Scene 9c: function works after cache deletion, result=${r9b[0][0]}")

    // 批量验证恢复后的一致性
    def r9c = sql "SELECT cache_recover_${runId}(val) FROM t_cache_batch WHERE id &lt;= 5 ORDER BY id"
    for (int i = 0; i &lt; 5; i++) {
        def expected = (i + 1) + 777
        assert r9c[i][0] == expected, "cache recover batch[${i}]: expected ${expected}, got ${r9c[i][0]}"
    }
    logger.info("Scene 9 PASS: cache auto-recovery after manual deletion — all results consistent")

java.sql.SQLException: errCode = 2, detailMessage = (172.20.49.73)[RUNTIME_ERROR]Flight stream finish failed with message: Directory contains no Python (.py) files: /mnt/hdd01/PERFORMANCE_ENV/be/lib/udf/5/1775636895621.2cfe161cc9c6f0b2ca5e7dcb6286e924.cache_recover. Detail: Python exception: Traceback (most recent call last):
  File "pyarrow/_flight.pyx", line 2315, in pyarrow._flight._do_exchange
  File "/mnt/hdd01/PERFORMANCE_ENV/be/plugins/python_udf/python_server.py", line 2493, in do_exchange
    self._handle_exchange_udf(python_udf_meta, reader, writer)
  File "/mnt/hdd01/PERFORMANCE_ENV/be/plugins/python_udf/python_server.py", line 2005, in _handle_exchange_udf
    loader = UDFLoaderFactory.get_loader(python_udf_meta)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/hdd01/PERFORMANCE_ENV/be/plugins/python_udf/python_server.py", line 1161, in get_loader
    if UDFLoaderFactory.check_module(location):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/hdd01/PERFORMANCE_ENV/be/plugins/python_udf/python_server.py", line 1196, in check_module
    raise ValueError(
ValueError: Directory contains no Python (.py) files: /mnt/hdd01/PERFORMANCE_ENV/be/lib/udf/5/1775636895621.2cfe161cc9c6f0b2ca5e7dcb6286e924.cache_recover 
  1. The error in py code cannot be correctly passed through
Doris> CREATE FUNCTION py_err_stats_test(INT)
    -> RETURNS INT
    -> PROPERTIES (
    ->     "type"="PYTHON_UDF",
    ->     "symbol"="evaluate",
    ->     "runtime_version"="3.12.11",
    ->     "always_nullable"="true"
    -> ) AS $$
    -> def evaluate(x):
    ->     raise TypeError("consistent_error_42")
    -> $$;
Query OK, 0 rows affected (0.004 sec)

-- Expected Error
Doris> SELECT py_err_stats_test(1);
+----------------------+
| py_err_stats_test(1) |
+----------------------+
|                 NULL |
+----------------------+
1 row in set (0.070 sec)
  1. The python process cannot execute correctly after it hangs up.
 </testcase>
  <testcase classname="stress.python_udf" name="python_udf_cross_feature_import_storage" time="0.430">
    <failure message="flowId=stress/python_udf/cross_feature_import_storage.groovy#python_udf_cross_feature_import_storage" type="SuiteFailure">    def pyVer = System.getenv('python_version') ?: '3.12.11'
    def db = context.config.getDbNameByFile(context.file)
    sql "DROP DATABASE IF EXISTS ${db} FORCE"
    sql "CREATE DATABASE IF NOT EXISTS ${db}"
    sql "USE ${db}"

    if (!python_udf_require_olap_table("python_udf_cross_feature_import_storage.src_t")) {
        return
    }
    if (!python_udf_require_inline_function("python_udf_cross_feature_import_storage.inline_runtime", pyVer)) {
^^^^^^^^^^^^^^^^^^^^^^^^^^ERROR LINE^^^^^^^^^^^^^^^^^^^^^^^^^^
        return
    }

    // ===== 1. INSERT INTO ... SELECT 中使用 UDF =====
    sql "DROP TABLE IF EXISTS src_t"
    sql """
        CREATE TABLE src_t (id INT, v INT)
        DISTRIBUTED BY HASH(id) BUCKETS 1
        PROPERTIES("replication_num"="1")
    """

java.lang.IllegalStateException: PYTHON_UDF_BLOCKED suite=python_udf_cross_feature_import_storage scenario=python_udf_cross_feature_import_storage.inline_runtime reason=inline probe failed. reason=errCode = 2, detailMessage = (172.20.49.73)[INTERNAL_ERROR]IOError: Flight stream finish failed with gRPC code 14, message: failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/doris_python_udf_55799.sock: Connection refused
  1. Support parameterless calls
  </testcase>
  <testcase classname="stress.python_udf" name="python_udf_thirdparty_packages" time="6.084">
    <failure message="flowId=stress/python_udf/udf_thirdparty_packages.groovy#python_udf_thirdparty_packages" type="SuiteFailure">            versions["pandas"] = "not_found"
        try:
            import jieba
            versions["jieba"] = jieba.__version__
        except:
            versions["jieba"] = "not_found"
        return json.dumps(versions)
    \$\$
    """
    def rVer = sql("SELECT py_pkg_versions()")
^^^^^^^^^^^^^^^^^^^^^^^^^^ERROR LINE^^^^^^^^^^^^^^^^^^^^^^^^^^
    assert rVer[0][0] != null
    def verJson = new groovy.json.JsonSlurper().parseText(rVer[0][0].toString())
    logger.info("Package versions: python=${verJson.python}, numpy=${verJson.numpy}, pandas=${verJson.pandas}, jieba=${verJson.jieba}")
    // 验证包确实已安装(非 not_found)
    assert verJson.numpy != 'not_found', "numpy should be installed"
    assert verJson.pandas != 'not_found', "pandas should be installed"
    assert verJson.jieba != 'not_found', "jieba should be installed"
    logger.info("Package version verification: PASS")
    logger.warn("Package version info: skipped — PRODUCT LIMITATION: Python UDF does not support zero-argument functions")


java.sql.SQLException: errCode = 2, detailMessage = (172.20.49.73)[INVALID_ARGUMENT]Python UDF input types is empty

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100.00% (67/67) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 71.85% (26719/37188)
Line Coverage 54.84% (283032/516076)
Region Coverage 51.89% (234450/451828)
Branch Coverage 53.38% (101447/190046)

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 86.54% (90/104) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.73% (27428/37202)
Line Coverage 57.42% (296481/516358)
Region Coverage 54.73% (247442/452095)
Branch Coverage 56.35% (107160/190175)

@linrrzqqq linrrzqqq force-pushed the pyudf-test branch 2 times, most recently from 25ea1db to 1f85b7a Compare April 13, 2026 09:54
@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 84.80% (106/125) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.64% (27388/37192)
Line Coverage 57.28% (295713/516250)
Region Coverage 54.36% (245718/452003)
Branch Coverage 56.07% (106590/190113)

@linrrzqqq linrrzqqq changed the title [Fix](pyudf) Fix functions throw errors after manually deleting udf cache files [Fix](pyudf) Fix some pyudf issues Apr 14, 2026
@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/6) 🎉
Increment coverage report
Complete coverage report

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/6) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 100.00% (6/6) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

BE UT Coverage Report

Increment line coverage 45.61% (26/57) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 53.30% (20358/38194)
Line Coverage 36.84% (191810/520595)
Region Coverage 33.16% (149242/450007)
Branch Coverage 34.27% (65270/190431)

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 45.61% (26/57) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.57% (27519/37407)
Line Coverage 57.35% (297646/518998)
Region Coverage 54.50% (247532/454164)
Branch Coverage 56.11% (107176/191004)

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 86.51% (109/126) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 71.49% (26754/37425)
Line Coverage 53.86% (279634/519148)
Region Coverage 47.21% (214968/455372)
Branch Coverage 50.57% (97369/192550)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 5.41% (6/111) 🎉
Increment coverage report
Complete coverage report

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 86.51% (109/126) 🎉
Increment coverage report
Complete coverage report

@linrrzqqq
Copy link
Copy Markdown
Collaborator Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

run buildall

@wyxxxcat
Copy link
Copy Markdown
Collaborator

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found blocking correctness issues in the PR.

Critical checkpoint conclusions:

  • Goal/test coverage: the PR targets Python UDF/UDAF/UDTF empty-argument support, deterministic metadata, error propagation, process recovery, and cache cleanup. It adds focused regression/unit coverage, but the implementation still misses BE enforcement for Python nondeterminism and produces non-recreatable SHOW CREATE FUNCTION SQL for Python functions.
  • Scope/focus: the change is not as small as possible; several production INFO logs with [pyudf-test] remain in hot analysis/execution paths.
  • Concurrency/lifecycle: the new UDAF cache broadcast runs from an agent task worker and can fail when a Python process dies between liveness check and Flight action; the new void wrapper converts that recoverable failure into an uncaught exception risk.
  • Configuration/incompatibility: no config or storage-format changes found.
  • Parallel code paths: Python UDF/UDAF/UDTF deterministic metadata is passed through FE/Nereids, but the BE execution path still has no deterministic flag, so constant evaluation can still treat nondeterministic Python UDFs as constants.
  • Conditional checks/error handling: Python UDAF error propagation is improved, but the new cleanup wrapper violates the repository error-handling pattern by throwing from a task callback instead of returning/logging Status.
  • Test coverage: positive tests were added, but they do not cover BE constantness for nondeterministic zero-arg Python UDFs or re-executing SHOW CREATE FUNCTION output for Python inline/module UDF/UDAF/UDTF.
  • Observability: existing diagnostics are sufficient for the reported runtime paths, but the leftover [pyudf-test] INFO logs should be removed or demoted before merge.
  • Transaction/persistence/data correctness: no transaction/data visibility path changes found.
  • User focus: no additional user-provided review focus was present.

&input_batch, _timezone_obj));
if (arguments.empty()) {
input_batch = arrow::RecordBatch::Make(schema, input_rows,
std::vector<std::shared_ptr<arrow::Array>> {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds zero-argument Python UDF execution, but BE still has no Python deterministic metadata. VectorizedFnCall::is_constant() only special-cases zero-arg JAVA_UDF, so a zero-arg Python UDF declared deterministic="false" can still be folded/evaluated once for a fragment and reused for all rows. A function like return random.random() or return time.time() would return one repeated value in SELECT py_rand() FROM t, despite the new FE/Nereids deterministic flag. Please pass/enforce the deterministic flag in the BE function constantness path, or at least disable constant treatment for zero-arg Python UDFs as Java UDFs do.

if (clean_req.__isset.function_id && clean_req.function_id > 0) {
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear_udaf_state_cache() is a void wrapper that uses THROW_IF_ERROR around a broadcast to Python processes. This callback does not catch exceptions, so a normal race where a Python process dies after is_alive() but before FlightClient::Connect()/DoAction() can make the clean-cache agent task throw out of the worker instead of just logging the failed cleanup. The neighboring module-cache cleanup already returns Status and logs failures; please make the UDAF cleanup follow that pattern instead of throwing from this task callback.

boolean isReturnNull = fn.getNullableMode() == NullableMode.ALWAYS_NULLABLE;
sb.append(",\n \"ALWAYS_NULLABLE\"=").append("\"" + isReturnNull + "\"");
sb.append(",\n \"DETERMINISTIC\"=").append("\"" + fn.isDeterministic() + "\"");
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python branch still does not emit all properties required to recreate the function. CreateFunctionCommand requires runtime_version for every PYTHON_UDF, and inline Python functions also need the persisted functionCode rendered as AS $$...$$; with this output, SHOW CREATE FUNCTION followed by executing the shown SQL fails with Python runtime version is not set or recreates a module function without the inline body. For Python UDTFs, the same scalar path also prints CREATE FUNCTION ... RETURNS <item type> instead of CREATE TABLES FUNCTION ... RETURNS ARRAY<...>, losing table-function semantics. Please make the generated SQL round-trip for Python UDF/UDAF/UDTF before adding only the deterministic property.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants